﻿using System;
using System.Collections.Generic;
using System.Threading;
using vJine.Core.IoC;
using vJine.Core.ORM;

namespace vJine.Core.Task {
    /// <summary>
    /// 基于话题及优先级的任务队列类型
    /// </summary>
    /// <typeparam name="Tdata">任务数据类型</typeparam>
    public class TaskQueue<Tdata>
        where Tdata : class {
        const int default_capacity = 1;
        const int default_task_max = 1;
        /// <summary>
        /// 初始化任务队列
        /// </summary>
        /// <param name="capacity">队列容量（最多可排队任务数据数量）</param>
        /// <param name="worker">工作者代理</param>
        /// <returns>任务队列</returns>
        public static TaskQueue<Tdata> Init(int capacity, Exec<Tdata> worker) {
            TaskQueue<Tdata> taskQueue = new TaskQueue<Tdata>(default_task_max, capacity, null, worker, null);
            taskQueue.Start();

            return taskQueue;
        }
        /// <summary>
        /// 初始化任务队列
        /// </summary>
        /// <param name="capacity">队列容量（最多可排队任务数据数量）</param>
        /// <param name="task_max">最大并行任务数量</param>
        /// <param name="worker">工作者代理</param>
        /// <returns>任务队列</returns>
        public static TaskQueue<Tdata> Init(int capacity, int task_max, Exec<Tdata> worker) {
            TaskQueue<Tdata> taskQueue = new TaskQueue<Tdata>(task_max, capacity, null, worker, null);
            taskQueue.Start();

            return taskQueue;
        }
        /// <summary>
        /// 初始化任务队列
        /// </summary>
        /// <param name="capacity">队列容量（最多可排队任务数据数量）</param>
        /// <param name="task_max">最大并行任务数量</param>
        /// <param name="keyResource">关键资源</param>
        /// <param name="worker">工作者代理</param>
        /// <returns>任务队列</returns>
        public static TaskQueue<Tdata> Init(int capacity, int task_max, string keyResource, Exec<Tdata> worker) {
            TaskQueue<Tdata> taskQueue = new TaskQueue<Tdata>(task_max, capacity, null, worker, null);
            taskQueue.Start();

            return taskQueue;
        }

        const int Max_priority = 6;
        /// <summary>
        /// 实例化任务队列
        /// </summary>
        public TaskQueue()
            : this(10) {
        }

        int Capacity { get; set; }
        /// <summary>
        /// 实例化任务队列
        /// </summary>
        /// <param name="capacity">队列容量（最多可排队任务数据数量）</param>
        public TaskQueue(int capacity)
            : this(1, capacity, null, null, null) {

        }
        /// <summary>
        /// 实例化任务队列
        /// </summary>
        /// <param name="capacity">队列容量（最多可排队任务数据数量）</param>
        /// <param name="worker">工作者代理</param>
        public TaskQueue(int capacity, Exec<Tdata> worker)
            : this(default_task_max, capacity, null, worker, null) {
        }
        /// <summary>
        /// 实例化任务队列
        /// </summary>
        /// <param name="capacity">队列容量（最多可排队任务数据数量）</param>
        /// <param name="service">服务代理</param>
        public TaskQueue(int capacity, Exec service)
            : this(1, capacity, service, null, null) {
        }
        /// <summary>
        /// 实例化任务队列
        /// </summary>
        /// <param name="capacity">队列容量（最多可排队任务数据数量）</param>
        /// <param name="service">服务代理</param>
        /// <param name="worker">工作者代理</param>
        public TaskQueue(int capacity, Exec service, Exec<Tdata> worker)
            : this(1, capacity, service, worker, null) {
        }
        /// <summary>
        /// 实例化任务队列
        /// </summary>
        /// <param name="task_max">最大并行任务数量</param>
        /// <param name="capacity">队列容量（最多可排队任务数据数量）</param>
        /// <param name="service">服务代理</param>
        /// <param name="worker">工作者代理</param>
        /// <param name="keyResource">关键资源</param>
        public TaskQueue(int task_max, int capacity, Exec service, Exec<Tdata> worker, string keyResource = null) {
            this.Capacity = capacity; this.task_max = task_max;
            this.service = service;
            this.worker = worker;

            this.signal_P_enqueue = new Semaphore(capacity, capacity);
            this.sinal_P_dequeue = new Semaphore(0, capacity);

            this.signal_T_enqueue = new Semaphore(capacity, capacity);
            this.sinal_T_dequeue = new Semaphore(0, capacity);

            for (int i = 0; i <= Max_priority; i++) {
                this.PriorityTasks.Add(i, new Queue<Tdata>());
            }
        }

        #region Priority Queue

        long jobs_count = 0;
        Semaphore signal_P_enqueue; Semaphore sinal_P_dequeue;
        Dictionary<int, Queue<Tdata>> PriorityTasks = new Dictionary<int, Queue<Tdata>>();
        /// <summary>
        /// 以最低优先级将任务数据加入队列
        /// </summary>
        /// <param name="data">任务数据</param>
        public void Enqueue(Tdata data) {
            this.Enqueue(data, Max_priority);
        }
        /// <summary>
        /// 以指定的优先级将任务数据加入队列
        /// </summary>
        /// <param name="data">任务数据</param>
        /// <param name="priority">任务优先级</param>
        public void Enqueue(Tdata data, int priority) {
            if (priority < 0 || priority > Max_priority) {
                throw new CoreException("优先级[{0}]超出范围(0-{1})", priority, Max_priority);
            }

            this.signal_P_enqueue.WaitOne();
            lock (this.PriorityTasks) {
                this.PriorityTasks[priority].Enqueue(data);
                this.jobs_count += 1;
            }

            this.sinal_P_dequeue.Release();
        }
        /// <summary>
        /// 从优先级队列中取出任务数据
        /// </summary>
        /// <param name="delay_ms">等待毫秒数(默认为10)</param>
        /// <returns>任务数据</returns>
        public Tdata Dequeue(int delay_ms = 10) {
            if (delay_ms <= 0) {
                this.sinal_P_dequeue.WaitOne();
            } else {
                if (!this.sinal_P_dequeue.WaitOne(delay_ms)) {
                    return null;
                }
            }

            lock (this.PriorityTasks) {
                for (int i = 0; i <= Max_priority; i++) {
                    if (this.PriorityTasks[i].Count > 0) {
                        Tdata data = this.PriorityTasks[i].Dequeue();
                        jobs_count -= 1;
                        this.signal_P_enqueue.Release();
                        return data;
                    }
                }
            }

            throw new CoreException("TaskQueue.Dequeue Fail");
        }
        #endregion Priority Queue

        #region Topic Queue

        Semaphore signal_T_enqueue; Semaphore sinal_T_dequeue;
        Dictionary<string, Queue<Tdata>> TopicTasks = new Dictionary<string, Queue<Tdata>>();
        /// <summary>
        /// 将指定话题的任务数据加入队列
        /// </summary>
        /// <param name="Topic">任务话题</param>
        /// <param name="data">任务数据</param>
        public void Enqueue(string Topic, Tdata data) {
            if (Topic == null) {
                throw new ArgumentNullException("Topic");
            }

            this.signal_T_enqueue.WaitOne();
            lock (this.TopicTasks) {
                if (!this.TopicTasks.ContainsKey(Topic)) {
                    this.TopicTasks.Add(Topic, new Queue<Tdata>());
                }
                this.TopicTasks[Topic].Enqueue(data);
            }

            this.sinal_T_dequeue.Release();
        }
        /// <summary>
        /// 从队列中取出指定任务话题的任务数据
        /// </summary>
        /// <param name="Topic">任务话题</param>
        /// <param name="delayms">等待毫秒数（默认为10）</param>
        /// <returns>任务数据（超时返回null）</returns>
        public Tdata Dequeue(string Topic, int delayms = 10) {
            if (delayms <= 0) {
                this.sinal_T_dequeue.WaitOne();
            } else {
                if (!this.sinal_T_dequeue.WaitOne(delayms)) {
                    return null;
                }
            }

            lock (this.TopicTasks) {
                if (!this.TopicTasks.ContainsKey(Topic)) {
                    this.sinal_T_dequeue.Release();
                    return null;
                }

                Queue<Tdata> tt = this.TopicTasks[Topic];

                Tdata t = tt.Dequeue();
                if (tt.Count == 0) {
                    this.TopicTasks.Remove(Topic);
                }

                this.signal_T_enqueue.Release();
                return t;
            }
            throw new CoreException("TaskQueue.Dequeue Fail");
        }
        #endregion Topic Queue

        bool IsRunning = false;
        int task_max = 0;
        Semaphore taskSignal = null;
        /// <summary>
        /// 开始执行任务
        /// </summary>
        public void Start() {
            this.Start(this.task_max == 0 ? default_task_max : this.task_max);
        }

        Exec service = null;
        Exec<Tdata> worker = null;
        /// <summary>
        /// 以指定的最大任务数量开始执行任务
        /// </summary>
        /// <param name="max_tasks">最大任务数量</param>
        public void Start(int max_tasks) {

            lock (this) {
                if (this.IsRunning) {
                    throw new CoreException("Task Is In Runnig State");
                }

                this.task_max = max_tasks;

                this.taskSignal =
                    new Semaphore(max_tasks, max_tasks);
                this.IsRunning = true;

                this.asyncCallBack =
                    new AsyncCallback(this.task_complete);

                if (this.service != null) {
                    ThreadPool.QueueUserWorkItem(new WaitCallback((object objNull) => {
                        this.service();
                    }), null);
                }

                if (worker != null) {
                    Exec<Exec<Tdata>> task = this.task_dispatcher;
                    task.BeginInvoke(worker, null, null);
                }
            }
        }

        ManualResetEvent stop_signal = new ManualResetEvent(false);
        /// <summary>
        /// 停止任务
        /// </summary>
        public void Stop() {
            lock (this) {
                this.IsRunning = false;

                //等待任务停止
                this.stop_signal.WaitOne();
                this.stop_signal.Close();
                this.stop_signal = null;

            }
        }

        int task_count = 0;
        AsyncCallback asyncCallBack = null;
        void task_dispatcher(Exec<Tdata> worker) {
            Exec runner = () => {
                while(this.IsRunning) {
                    Tdata data = this.Dequeue(100);
                    if(data == null) {
                        break;
                    }

                    worker(data);
                };
            };

            while (this.IsRunning) {
                if (this.taskSignal.WaitOne(50)) {
                    lock(this) {
                        this.task_count += 1;
                        runner.BeginInvoke(this.asyncCallBack, null);
                    }
                }
            }

            this.stop_signal.Set();
        }

        void task_complete(IAsyncResult result) {
            lock(this) {
                this.task_count -= 1;
            }
            this.taskSignal.Release();
        }
    }
}